#!/usr/bin/python3

# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option) any
# later version.  See http://www.gnu.org/copyleft/lgpl.html for the full text
# of the license.

__author__ = 'Bastien Nocera'
__email__ = 'hadess@hadess.net'
__copyright__ = '(c) 2021 Red Hat Inc.'
__license__ = 'LGPL 3+'

import unittest
import sys
import subprocess
import fcntl
import os
import time

import taptestrunner

try:
    # Do all non-standard imports here so we can skip the tests if any
    # needed packages are not available.
    import dbus
    import dbus.mainloop.glib
    import dbusmock
    from gi.repository import GLib
    from gi.repository import Gio
    from gi.repository import GObject

    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    # XDG_DESKTOP_PORTAL_PATH = os.path.expanduser("~/.cache/jhbuild/build/xdg-desktop-portal/xdg-desktop-portal")
    XDG_DESKTOP_PORTAL_PATH = "@libexecdir@/xdg-desktop-portal"

    class TestPowerProfileMonitorPortal(dbusmock.DBusTestCase):
        '''Test GPowerProfileMonitorPortal'''

        @classmethod
        def setUpClass(klass):
            klass.start_system_bus()
            klass.dbus_con = klass.get_dbus(True)
            # Start session bus so that xdg-desktop-portal can run on it
            klass.start_session_bus()

        def setUp(self):
            try:
                Gio.PowerProfileMonitor
            except AttributeError:
                raise unittest.SkipTest('Power Profile Monitor not in '
                                        'introspection data. Requires '
                                        'GObject-Introspection > 1.69.0')
            try:
                (self.p_mock, self.obj_ppd) = self.spawn_server_template(
                    'power_profiles_daemon', {}, stdout=subprocess.PIPE)
            except ModuleNotFoundError:
                raise unittest.SkipTest("power-profiles-daemon dbusmock template not "
                                        "found. Requires dbusmock > 0.23.1.")
            # set log to nonblocking
            flags = fcntl.fcntl(self.p_mock.stdout, fcntl.F_GETFL)
            fcntl.fcntl(self.p_mock.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
            self.power_saver_enabled = False
            self.dbus_props = dbus.Interface(self.obj_ppd, dbus.PROPERTIES_IFACE)
            try:
                self.xdp = subprocess.Popen([XDG_DESKTOP_PORTAL_PATH])
            except FileNotFoundError:
                self.p_mock.terminate()
                self.p_mock.wait()
                raise unittest.SkipTest("xdg-desktop-portal not available")

            try:
                self.wait_for_bus_object('org.freedesktop.portal.Desktop',
                                        '/org/freedesktop/portal/desktop')
            except:
                self.p_mock.terminate()
                self.p_mock.wait()
                raise
            # subprocess.Popen(['gdbus', 'monitor', '--session', '--dest', 'org.freedesktop.portal.Desktop'])

            os.environ['GIO_USE_PORTALS'] = "1"
            self.power_profile_monitor = Gio.PowerProfileMonitor.dup_default()
            assert("GPowerProfileMonitorPortal" in str(self.power_profile_monitor))
            self.power_profile_monitor.connect("notify::power-saver-enabled", self.power_saver_enabled_cb)
            self.mainloop = GLib.MainLoop()
            self.main_context = self.mainloop.get_context()

        def tearDown(self):
            self.p_mock.terminate()
            self.p_mock.wait()

        def assertEventually(self, condition, message=None, timeout=50):
            '''Assert that condition function eventually returns True.

            Timeout is in deciseconds, defaulting to 50 (5 seconds). message is
            printed on failure.
            '''
            while timeout >= 0:
                context = GLib.MainContext.default()
                while context.iteration(False):
                    pass
                if condition():
                    break
                timeout -= 1
                time.sleep(0.1)
            else:
                self.fail(message or 'timed out waiting for ' + str(condition))

        def power_saver_enabled_cb(self, spec, data):
            self.power_saver_enabled = self.power_profile_monitor.get_power_saver_enabled()
            self.main_context.wakeup()

        def test_power_profile_power_saver_enabled_portal(self):
            '''power-saver-enabled property'''

            self.assertEqual(self.power_profile_monitor.get_power_saver_enabled(), False)
            self.dbus_props.Set('net.hadess.PowerProfiles', 'ActiveProfile', dbus.String('power-saver', variant_level=1))
            self.assertEventually(lambda: self.power_saver_enabled == True, "power-saver didn't become enabled", 10)

            self.dbus_props.Set('net.hadess.PowerProfiles', 'ActiveProfile', dbus.String('balanced', variant_level=1))
            self.assertEventually(lambda: self.power_saver_enabled == False, "power-saver didn't become disabled", 10)

        def test_power_profile_power_saver_enabled_portal_default(self):
            '''power-saver-enabled property default value'''

            self.dbus_props.Set('net.hadess.PowerProfiles', 'ActiveProfile', dbus.String('power-saver', variant_level=1))

            # Create a new power profile monitor and check its property value is
            # correct by default.
            new_power_profile_monitor = GObject.new(GObject.type_from_name('GPowerProfileMonitorPortal'))
            new_power_profile_monitor.init()
            self.assertTrue(new_power_profile_monitor.get_power_saver_enabled())

except ImportError as e:
    @unittest.skip("Cannot import %s" % e.name)
    class TestPowerProfileMonitorPortal(unittest.TestCase):
        def test_power_profile_power_saver_enabled_portal(self):
            pass

if __name__ == '__main__':
    unittest.main(testRunner=taptestrunner.TAPTestRunner())
